[spark] initiate fluss-spark and introduce spark catalog and table#2219
[spark] initiate fluss-spark and introduce spark catalog and table#2219wuchong merged 2 commits intoapache:mainfrom
Conversation
|
@wuchong please take a look, thanks. |
wuchong
left a comment
There was a problem hiding this comment.
Thanks @YannByron for the great work! I’ve left a few comments for consideration.
Additionally, it would be great if we could add Javadoc or explanatory comments to the key classes and methods, this would greatly improve readability and maintainability for future contributors.
fluss-spark/pom.xml
Outdated
| <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> | ||
| <junitxml>.</junitxml> | ||
| <argLine>-ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:ReservedCodeCacheSize=128m ${extraJavaTestArgs} -Dio.netty.tryReflectionSetAccessible=true</argLine> | ||
| <filereports>PaimonTestSuite.txt</filereports> |
| <version>0.9-SNAPSHOT</version> | ||
| </parent> | ||
|
|
||
| <artifactId>fluss-spark-common</artifactId> |
There was a problem hiding this comment.
Should we add a Scala version suffix to the artifact ID (also for fluss-spark-3.4 and fluss-spark-3.5 modules)? This would ensure that the published JARs automatically include the Scala version in their artifact names during Maven deployment, following standard Scala cross-build conventions.
There was a problem hiding this comment.
Sure. I'm just going to do this until scala 2.13 or spark4 is supported. Let me do this in this pr.
pom.xml
Outdated
| <profile> | ||
| <id>spark3</id> | ||
| <modules> | ||
| <module>fluss-spark/fluss-spark-3.5</module> | ||
| <module>fluss-spark/fluss-spark-3.4</module> | ||
| </modules> | ||
| <activation> | ||
| <activeByDefault>true</activeByDefault> | ||
| <property> | ||
| <name>spark3</name> | ||
| </property> | ||
| </activation> | ||
| </profile> |
There was a problem hiding this comment.
I think we can enable these modues by default? So that the license checker pipeline can verify these modules as well.
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| class FlussCatalog extends TableCatalog with SupportsFlussNamespaces with WithFlussAdmin { |
There was a problem hiding this comment.
How about naming it SparkCatalog? Since these catalog implementations reside in the Fluss repository alongside those for other engines (such as Flink and Trino), including the engine name in the class name would make it easier to identify and distinguish between them.
| import org.apache.fluss.metadata.TableInfo | ||
| import org.apache.fluss.spark.catalog.{FlussTableInfo, SupportsFlussPartitionManagement} | ||
|
|
||
| case class FlussTable(table: TableInfo) |
| SparkDataTypes.createMapType( | ||
| mapType.getKeyType.accept(this), | ||
| mapType.getValueType.accept(this), | ||
| mapType.isNullable |
There was a problem hiding this comment.
mapType.getValueType.isNullable
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| object FlussDataTypeToSparkDataType extends DataTypeVisitor[SparkDataType] { |
There was a problem hiding this comment.
FlussDataTypeToSparkDataType -> FlussToSparkTypeVisitor to align with SparkToFlussTypeVisitor.
There was a problem hiding this comment.
OK. And the other sub-classes of DataTypeVisitor can also follow this rule.
|
|
||
| val (tableProps, customProps) = | ||
| caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition { | ||
| case (key, _) => FlussConfigUtils.TABLE_OPTIONS.containsKey(key) |
There was a problem hiding this comment.
FlussConfigUtils.TABLE_OPTIONS is a static set, however, the fluss table created by newer client version may carry additional table options. Therefore, it would be more robust to check whether the config key start with table. prefix.
.scalafmt.conf
Outdated
| ["org.apache.paimon\\..*"], | ||
| ["org.apache.paimon.shade\\..*"], |
| .field("pt", DataTypes.STRING()) | ||
| .build()) | ||
| assertThat(testPartitionedTable.getPartitionKeys.get(0)).isEqualTo("pt") | ||
| assertThat(testPartitionedTable.getCustomProperties.containsKey("key")).isEqualTo(true) |
6c21e44 to
4f8e790
Compare
wuchong
left a comment
There was a problem hiding this comment.
LGTM. I rebased the branch to trigger test on latest main and appended a commit to mapping Spark BinaryType to Fluss BytesType, because the BinaryType in Fluss is a fixed size binary not a VarbinaryType.
Purpose
to introduce spark engine for #155
This is the first pr, which includes:
fluss-spark-common,fluss-spark-utandfluss-spark-3.xincluded.Linked issue: close #228
Brief change log
Tests
API and Format
Documentation